# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from hysop.constants import Backend
from hysop.symbolic.base import DummySymbolicScalar, sm
from hysop.symbolic.constant import SymbolicConstant
from hysop.tools.htypes import check_instance, to_tuple, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.tools.sympy_utils import subscript
from hysop.testsenv import __HAS_OPENCL_BACKEND__
if __HAS_OPENCL_BACKEND__:
from hysop.backend.device.opencl import clArray
from hysop.backend.device.opencl.opencl_array import OpenClArray
from hysop.backend.host.host_array import HostArray
[docs]
class SymbolicMemoryObject(DummySymbolicScalar):
def __new__(cls, memory_object, name, **kwds):
obj = super().__new__(cls, name=name, **kwds)
obj._memory_object = None
if memory_object is not None:
obj.bind_memory_object(memory_object)
return obj
@property
def is_bound(self):
return self._memory_object is not None
[docs]
def assert_bound(self):
if not self.is_bound:
msg = "{}::{} memory object has not been bound yet."
msg = msg.format(self.__class__.__name__, self.name)
raise RuntimeError(msg)
@property
def memory_object(self):
if self._memory_object is None:
msg = f"Symbolic memory_object {self.name} has not been setup yet."
raise RuntimeError(msg)
return self._memory_object
[docs]
@abstractmethod
def bind_memory_object(self, memory_object, force=False):
if (not force) and (self._memory_object is not None):
msg = (
f"A memory_object has already been bound to SymbolicArray {self.name}."
)
raise RuntimeError(msg)
if __HAS_OPENCL_BACKEND__ and isinstance(
memory_object, (OpenClArray, HostArray)
):
memory_object = memory_object.handle
self._memory_object = memory_object
return self
@property
def shape(self):
self.assert_bound()
return self._memory_object.shape
@property
def size(self):
self.assert_bound()
return self._memory_object.size
@property
def strides(self):
self.assert_bound()
return self._memory_object.strides
@property
def dtype(self):
self.assert_bound()
return self._memory_object.dtype
@property
def ctype(self):
from hysop.backend.device.codegen.base.variables import dtype_to_ctype
self.assert_bound()
return dtype_to_ctype(self._memory_object.dtype)
@property
def dim(self):
self.assert_bound()
return self._memory_object.ndim
[docs]
def short_description(self):
self.assert_bound()
return "{}[dim={}, shape=[], strides={}, dtype={}]".format(
self.__class__.__name__, self.dim, self.shape, self.strides, self.dtype
)
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
return id(self)
def _hashable_content(self):
"""See sympy.core.basic.Basic._hashable_content()"""
hc = super()._hashable_content()
hc += (str(id(self)),)
return hc
[docs]
class IndexedBuffer(sm.Indexed):
"""
Tag for indexed SymbolicBuffers.
"""
@property
def indexed_object(self):
return self.args[0].args[0]
@property
def index(self):
assert len(self.args) == 2
return self.args[1]
@property
def ctype(self):
return self.indexed_object.ctype
[docs]
class SymbolicArray(SymbolicMemoryObject):
"""
An array is considered to be indexed by local indices in
autogenerated vectorized code (in symbolic code generation
framework).
"""
[docs]
class ArrayRequirements:
def __init__(self, sarray):
self._symbolic_array = sarray
self.min_ghosts = npw.int_zeros(shape=(sarray.dim,))
[docs]
def update_requirements(self, other):
check_instance(other, self.__class__)
assert other.min_ghosts.size == self.min_ghosts.size
self.min_ghosts += other.min_ghosts
def __new__(cls, memory_object, name, dim=None, **kwds):
obj = super().__new__(cls, memory_object=memory_object, name=name, **kwds)
msg = "Dimension could not be deduced from memory_object, "
msg += "please specify a dimension for symbolic array {}."
msg = msg.format(name)
if memory_object is None:
if dim is None:
raise RuntimeError(msg)
elif hasattr(memory_object, "ndim"):
dim = memory_object.ndim
elif hasattr(memory_object, "dim"):
dim = memory_object.dim
else:
raise RuntimeError(msg)
check_instance(dim, int)
obj._dim = dim
return obj
def __getitem__(self, key):
msg = f"Symbolic array {self.name} cannot be indexed."
raise RuntimeError(msg)
[docs]
def new_requirements(self):
return self.ArrayRequirements(self)
@property
def array(self):
return self._memory_object
@property
def dim(self):
return self._dim
[docs]
def to_backend(self, backend):
if backend is Backend.HOST:
self.__class__ = HostSymbolicArray
elif backend is Backend.OPENCL:
self.__class__ = OpenClSymbolicArray
else:
msg = f"Unknown backend kind {backend}."
raise NotImplementedError(msg)
return self
[docs]
class SymbolicBuffer(SymbolicMemoryObject):
"""
A buffer will not be indexed by local indices by default.
The user has to index a SymbolicBuffer so that it can be
used for code generation.
"""
def __getitem__(self, key):
assert isinstance(key, (int, npw.integer, sm.Expr))
return IndexedBuffer(self, key)
@property
def buffer(self):
return self._memory_object
[docs]
def to_backend(self, backend):
if backend is Backend.HOST:
self.__class__ = HostSymbolicBuffer
elif backend is Backend.OPENCL:
self.__class__ = OpenClSymbolicBuffer
else:
msg = f"Unknown backend kind {backend}."
raise NotImplementedError(msg)
return self
[docs]
class SymbolicNdBuffer(SymbolicBuffer):
"""Same as a SymbolicBuffer, but with ndindex access and ghost support."""
def __new__(
cls,
name,
memory_object=None,
dim=None,
strides=None,
dtype=None,
ghosts=None,
**kwds,
):
obj = super().__new__(cls, memory_object=memory_object, name=name, **kwds)
msg = "Dimension could not be deduced from memory_object, "
msg += "please specify a dimension for symbolic array {}."
msg = msg.format(name)
if memory_object is None:
if dim is None:
raise RuntimeError(msg)
elif hasattr(memory_object, "ndim"):
dim = memory_object.ndim
elif hasattr(memory_object, "dim"):
dim = memory_object.dim
else:
raise RuntimeError(msg)
check_instance(dim, int)
obj._dim = dim
obj._symbolic_strides = tuple(
SymbolicConstant(
name=f"s{i}", pretty_name="s" + subscript(i), dtype=npw.int32
)
for i in range(dim)
)
obj._symbolic_ghosts = tuple(
SymbolicConstant(
name=f"g{i}", pretty_name="g" + subscript(i), dtype=npw.int32
)
for i in range(dim)
)
obj._allow_update_symbolic_constants = True
obj.update_symbolic_constants(
memory_object=memory_object,
strides=strides,
dtype=dtype,
ghosts=ghosts,
force=False,
)
return obj
[docs]
def bind_memory_object(
self, memory_object, strides=None, dtype=None, ghosts=None, force=False, **kwds
):
super().bind_memory_object(memory_object=memory_object, force=force, **kwds)
self.update_symbolic_constants(
memory_object=memory_object,
strides=strides,
dtype=dtype,
ghosts=ghosts,
force=force,
)
[docs]
def update_symbolic_constants(self, memory_object, strides, dtype, ghosts, force):
if (
hasattr(self, "_allow_update_symbolic_constants")
and self._allow_update_symbolic_constants
):
strides = first_not_None(strides, getattr(memory_object, "strides", None))
dtype = first_not_None(dtype, getattr(memory_object, "dtype", None))
assert (
strides is not None
), "Could not determine strides from memory_object."
assert dtype is not None, "Could not determine dtype from memory_object."
itemsize = dtype.itemsize
strides = to_tuple(strides)
check_instance(strides, tuple, values=int, size=self._dim)
for ss, si in zip(self._symbolic_strides, strides):
assert si % itemsize == 0
ss.bind_value(si // itemsize, force=force)
ghosts = first_not_None(ghosts, getattr(memory_object, "ghosts", None))
assert ghosts is not None, "Could not determine ghosts from memory_object."
ghosts = to_tuple(ghosts)
check_instance(ghosts, tuple, values=int, size=self._dim)
for sg, gi in zip(self._symbolic_ghosts, ghosts):
sg.bind_value(gi, force=force)
@property
def dim(self):
return self._dim
@property
def symbolic_strides(self):
return self._symbolic_strides
@property
def symbolic_ghosts(self):
return self._symbolic_ghosts
def __call__(self, *idx):
if len(idx) == 1 and isinstance(idx[0], npw.ndarray):
assert idx[0].size == self._dim, idx[0].shape
idx = tuple(idx[0].ravel().tolist())
assert len(idx) == self._dim, idx
offset = npw.dot(self._symbolic_strides, npw.add(idx, self._symbolic_ghosts))
return self.__getitem__(key=offset)
[docs]
class SymbolicHostMemoryObject:
[docs]
def bind_memory_object(self, memory_object, **kwds):
check_instance(memory_object, (HostArray, npw.ndarray))
return super().bind_memory_object(memory_object, **kwds)
[docs]
class SymbolicDeviceMemoryObject:
[docs]
def bind_memory_object(self, memory_object, **kwds):
check_instance(memory_object, (OpenClArray, clArray.Array))
return super().bind_memory_object(memory_object, **kwds)
@property
def base_data(self):
self.assert_bound()
return self._memory_object.base_data
@property
def offset(self):
self.assert_bound()
return self._memory_object.offset
[docs]
class HostSymbolicArray(SymbolicHostMemoryObject, SymbolicArray):
pass
[docs]
class HostSymbolicBuffer(SymbolicHostMemoryObject, SymbolicBuffer):
pass
[docs]
class HostSymbolicNdBuffer(SymbolicHostMemoryObject, SymbolicNdBuffer):
pass
if __HAS_OPENCL_BACKEND__:
[docs]
class OpenClSymbolicArray(SymbolicDeviceMemoryObject, SymbolicArray):
pass
[docs]
class OpenClSymbolicBuffer(SymbolicDeviceMemoryObject, SymbolicBuffer):
pass
[docs]
class OpenClSymbolicNdBuffer(SymbolicDeviceMemoryObject, SymbolicNdBuffer):
pass
if __name__ == "__main__":
from hysop.core.arrays.all import default_host_array_backend
a = npw.ones(shape=(10, 10), dtype=npw.int8)
b = default_host_array_backend.zeros(shape=(10,), dtype=npw.uint16)
A = HostSymbolicArray(a, name="a")
B = b.as_symbolic_array("b")
C = HostSymbolicBuffer(a, name="c")
D = b.as_symbolic_buffer("d")
print(7 * A + 9 * B + 10 * C - 4 * D)
assert A.array is a
assert B.array is b.handle
assert C.buffer is a
assert D.buffer is b.handle
try:
A[5]
assert False
except RuntimeError as e:
pass
print(C[5])
print(D[C])
print()